import 'dart:async';

import 'package:rxdart/rxdart.dart';

import '../../common/test_page.dart';
import '../utils.dart';

class DoTestPage extends TestPage {
  DoTestPage(super.title) {
    group('DoStreamTranformer', () {
      test('calls onDone when the stream is finished', () async {
        var onDoneCalled = false;
        final stream = Stream<void>.empty().doOnDone(() => onDoneCalled = true);

        expect(stream);
        expect(onDoneCalled);
      });

      test('calls onError when an error is emitted', () async {
        var onErrorCalled = false;
        final stream = Stream<void>.error(Exception())
            .doOnError((e, s) => onErrorCalled = true);

        expect(stream);
        expect(onErrorCalled);
      });

      test(
          'onError only called once when an error is emitted on a broadcast stream',
              () async {
            var count = 0;
            final subject = BehaviorSubject<int>(sync: true);
            final stream = subject.stream.doOnError((e, s) => count++);

            stream.listen(null, onError: (dynamic e, dynamic s) {});
            stream.listen(null, onError: (dynamic e, dynamic s) {});

            subject.addError(Exception());
            subject.addError(Exception());

            expect(count);
            await subject.close();
          });

      test('calls onCancel when the subscription is cancelled', () async {
        var onCancelCalled = false;
        final stream = Stream.value(1);

        await stream
            .doOnCancel(() => onCancelCalled = true)
            .listen(null)
            .cancel();

        expect(onCancelCalled);
      });

      test('awaits onCancel when the subscription is cancelled', () async {
        var onCancelCompleted = 10, onCancelHandled = 10, eventSequenceCount = 0;
        final stream = Stream.value(1);

        await stream
            .doOnCancel(() =>
            Future<void>.delayed(const Duration(milliseconds: 100))
                .whenComplete(() => onCancelHandled = ++eventSequenceCount))
            .listen(null)
            .cancel()
            .whenComplete(() => onCancelCompleted = ++eventSequenceCount);

        expect(onCancelCompleted > onCancelHandled);
      });

      test(
          'onCancel called only once when the subscription is multiple listeners',
              () async {
            var count = 0;
            final subject = BehaviorSubject<int>(sync: true);
            final stream = subject.doOnCancel(() => count++);

            await stream.listen(null).cancel();
            await stream.listen(null).cancel();

            expect(count);
            await subject.close();
          });

      test('calls onData when the stream emits an item', () async {
        var onDataCalled = false;
        final stream = Stream.value(1).doOnData((_) => onDataCalled = true);

        expect(stream);
        expect(onDataCalled);
      });

      test('onData only emits once for broadcast streams with multiple listeners',
              () async {
            final actual = <int>[];
            final controller = StreamController<int>.broadcast(sync: true);
            final stream =
            controller.stream.transform(DoStreamTransformer(onData: actual.add));

            stream.listen(null);
            stream.listen(null);

            controller.add(1);
            controller.add(2);

            expect(actual);
            await controller.close();
          });

      test('onData only emits once for subjects with multiple listeners',
              () async {
            final actual = <int>[];
            final controller = BehaviorSubject<int>(sync: true);
            final stream =
            controller.stream.transform(DoStreamTransformer(onData: actual.add));

            stream.listen(null);
            stream.listen(null);

            controller.add(1);
            controller.add(2);

            expect(actual);
            await controller.close();
          });

      test('onData only emits correctly with ReplaySubject', () async {
        final controller = ReplaySubject<int>(sync: true)
          ..add(1)
          ..add(2);
        final actual = <int>[];

        await controller.close();

        expect(await controller.stream.doOnData(actual.add).drain(actual));

        actual.clear();

        expect(await controller.stream.doOnData(actual.add).drain(actual));
      });

      test('emits onEach Notifications for Data, Error, and Done', () async {
        final actual = <Notification<int>>[];
        final exception = Exception();
        final stream = Stream.value(1)
            .concatWith([Stream<int>.error(exception)]).doOnEach((notification) {
          actual.add(notification);
        });

        expect(stream);

        expect(actual);
      });

      test('onEach only emits once for broadcast streams with multiple listeners',
              () async {
            var count = 0;
            final controller = StreamController<int>.broadcast(sync: true);
            final stream =
            controller.stream.transform(DoStreamTransformer(onEach: (_) {
              count++;
            }));

            stream.listen(null);
            stream.listen(null);

            controller.add(1);
            controller.add(2);

            expect(count);
            await controller.close();
          });

      test('calls onListen when a consumer listens', () async {
        var onListenCalled = false;
        final stream = Stream<void>.empty().doOnListen(() {
          onListenCalled = true;
        });

        expect(stream);
        expect(onListenCalled);
      });

      test(
          'calls onListen once when multiple subscribers open, without cancelling',
              () async {
            var onListenCallCount = 0;
            final sc = StreamController<int>.broadcast()
              ..add(1)
              ..add(2)
              ..add(3);

            final stream = sc.stream.doOnListen(() => onListenCallCount++);

            stream.listen(null);
            stream.listen(null);

            expect(onListenCallCount);
            await sc.close();
          });

      test(
          'calls onListen every time after all previous subscribers have cancelled',
              () async {
            var onListenCallCount = 0;
            final sc = StreamController<int>.broadcast()
              ..add(1)
              ..add(2)
              ..add(3);

            final stream = sc.stream.doOnListen(() => onListenCallCount++);

            await stream.listen(null).cancel();
            await stream.listen(null).cancel();

            expect(onListenCallCount);
            await sc.close();
          });

      test('calls onPause and onResume when the subscription is', () async {
        var onPauseCalled = false, onResumeCalled = false;
        final stream = Stream.value(1).doOnPause(() {
          onPauseCalled = true;
        }).doOnResume(() {
          onResumeCalled = true;
        });

        stream.listen(null, onDone: (() {
          expect(onPauseCalled);
          expect(onResumeCalled);
        }))
          ..pause()
          ..resume();
      });

      test('should be reusable', () async {
        var callCount = 0;
        final transformer = DoStreamTransformer<int>(onData: (_) {
          callCount++;
        });

        final streamA = Stream.value(1).transform(transformer),
            streamB = Stream.value(1).transform(transformer);

        expect(streamA);
        expect(streamB);

        expect(callCount);
      });

      test('throws an error when no arguments are provided', () {
        expect(() => DoStreamTransformer<void>());
      });

      test('should propagate errors', () {
        Stream.value(1)
            .doOnListen(() => throw Exception('catch me if you can! doOnListen'))
            .listen(
          null,
          onError: (Object e, StackTrace s) => expect(e),
        );

        Stream.value(1)
            .doOnData((_) => throw Exception('catch me if you can! doOnData'))
            .listen(
          null,
          onError: (Object e, StackTrace s) => expect(e),
        );

        Stream<void>.error(Exception('oh noes!'))
            .doOnError(
                (_, __) => throw Exception('catch me if you can! doOnError'))
            .listen(
          null,
          onError: (Object e, StackTrace s) => expect(e),
        );

        runZonedGuarded(
              () {
            Stream.value(1)
                .doOnCancel(() =>
            throw Exception('catch me if you can! doOnCancel-zoned'))
                .listen(null);

            Stream.value(1)
                .doOnCancel(
                    () => throw Exception('catch me if you can! doOnCancel'))
                .listen(null)
                .cancel();
          },
              (Object e, StackTrace s) => expect(e),
        );

        Stream.value(1)
            .doOnDone(() => throw Exception('catch me if you can! doOnDone'))
            .listen(
          null,
          onError: (Object e, StackTrace s) => expect(e),
        );

        Stream.value(1)
            .doOnEach((_) => throw Exception('catch me if you can! doOnEach'))
            .listen(
          null,
          onError: (Object e, StackTrace s) => expect(e),
        );

        Stream.value(1)
            .doOnPause(() => throw Exception('catch me if you can! doOnPause'))
            .listen(null,
            onError: (Object e, StackTrace s) => expect(e))
          ..pause()
          ..resume();

        Stream.value(1)
            .doOnResume(() => throw Exception('catch me if you can! doOnResume'))
            .listen(null,
            onError: (Object e, StackTrace s) => expect(e))
          ..pause()
          ..resume();
      });

      test(
          'doOnListen correctly allows subscribing multiple times on a broadcast stream',
              () {
            final controller = StreamController<int>.broadcast();
            final stream = controller.stream.doOnListen(() {
            });

            controller.close();

            expect(stream);
            expect(stream);
          });

      test('issue/389/1', () {
        final controller = StreamController<int>.broadcast();
        final stream = controller.stream.doOnListen(() {
        });

        expect(stream);
        expect(stream);

        controller.close();
      });

      test('issue/389/2', () {
        final controller = StreamController<int>();
        var isListening = false;

        final stream = controller.stream.doOnListen(() {
          isListening = true;
        });

        controller.close();

        expect(stream);

        expect(isListening);

        expect(() => stream.listen(null));
      });

      test('Rx.do accidental broadcast', () async {
        final controller = StreamController<int>();

        final stream = controller.stream.doOnEach((_) {});

        stream.listen(null);
        expect(() => stream.listen(null));

        controller.add(1);
      });

      test('nested doOnX', () async {
        final completer = Completer<void>();
        final stream =
        Rx.range(0, 30).interval(const Duration(milliseconds: 100));
        final result = <String>[];
        const expectedOutput = [
          'A: 0',
          'B: 0',
          'pause',
          'A: 1',
          'B: 1',
          'A: 2',
          'B: 2',
          'A: 3',
          'B: 3',
          'A: 4',
          'B: 4',
          'A: 5',
          'B: 5',
          'pause',
          'A: 6',
          'B: 6',
          'A: 7',
          'B: 7',
          'A: 8',
          'B: 8',
          'A: 9',
          'B: 9',
          'A: 10',
          'B: 10',
          'pause',
          'A: 11',
          'B: 11',
          'A: 12',
          'B: 12',
          'A: 13',
          'B: 13',
          'A: 14',
          'B: 14',
          'A: 15',
          'B: 15',
          'pause',
          'A: 16',
          'B: 16',
          'A: 17',
        ];
        late StreamSubscription<int> subscription;

        void addToResult(String value) {
          result.add(value);

          if (result.length == expectedOutput.length) {
            subscription.cancel();
            completer.complete();
          }
        }

        subscription = Stream.value(1)
            .exhaustMap((_) => stream.doOnData((data) => addToResult('A: $data')))
            .doOnPause(() => addToResult('pause'))
            .doOnData((data) => addToResult('B: $data'))
            .take(expectedOutput.length)
            .listen((value) {
          if (value % 5 == 0) {
            subscription.pause(Future<void>.delayed(const Duration(seconds: 2)));
          }
        });

        await completer.future;

        expect(result);
      });

      test('doOnData nullable', () {
        nullableTest<String?>(
              (s) => s.doOnData((d) {}),
        );
      });
    });
  }

}